Cats Effect Semaphoreによるインメモリキャッシュストレージの実装
はじめに
キャッシュスタンピード は高負荷のワークロード下の並列コンピューティングシステムにおけるカスケード障害の1つとして知られています。この記事ではキャッシュスタンピードの対策としてCats Effect Semaphoreを用いた非同期ノンブロッキングロックによるキャッシュストレージを実装しました。
参考: 既知の対策
スタンピードに対するよく知られた対策についてはWikipediaのCache Stampede Mitigationで簡潔に紹介されています。
また複数ノード間でのスタンピード対策として「RedisConf17 - Internet Archive - Preventing Cache Stampede with Redis and XFetch」ではRedisを使ったProbabilistic early expirationの実装の詳細が紹介されています。
前提
今回はロックを使った対策を実装します。想定しているユースケースは以下の通りです。
- ワークスティーリングスケジューラー上で稼働しているWEB API上で使用する
- キャッシュはノード単位で保持する
- キャッシュ対象はアクセストークンのような有効期限付きのデータで保持するデータは高々1個である
今回の実装の方針
上記の前提においてロックを使ったキャッシュストレージを実装します。主たる方針は下記の通りです。
- キャッシュの更新はリソースを使用するタスクが(ロックを取得した上で)行う
- ロック待ちタスクがスレッドをブロックしないように非同期ノンブロッキングロックが可能なプリミティブ(cats.effect.concurrent.MVarやSemaphore)を選択する
- 更新時にはロックの取得前後でキャッシュの有効性を確認して有効な場合には更新を行わない(スタンピード抑制)
実装
上記の方針を元にした実装が下記になります。
package example.pool import cats.effect.Concurrent import cats.effect.concurrent.{Ref, Semaphore} import cats.implicits._ trait ResourcePool[F[_]] { def use[A](op: ResourcePool.Resource => F[A]): F[A] def empty: F[Unit] } object ResourcePool { def create[F[_]: Concurrent](implicit fetch: Fetch[F]): F[ResourcePool[F]] = for { sem <- Semaphore[F](1)(implicitly[Concurrent[F]]) ref <- Ref.of[F, Option[Resource]](None) } yield SemaphoreResourcePool[F](fetch)(sem, ref) sealed trait InvalidResourcePoolState trait Fetch[F[_]] { def fetch: F[Resource] } trait Resource { def expires: Boolean } private final case class SemaphoreResourcePool[F[_]: Concurrent]( fetch: Fetch[F] )(sem: Semaphore[F], ref: Ref[F, Option[Resource]]) extends ResourcePool[F] { override def use[A](op: Resource => F[A]): F[A] = ifUnavailable(update) *> ref.get >>= (_.fold[F[A]]( Concurrent[F].raiseError(ResourceDidNotFetched) )(op)) override def empty: F[Unit] = sem.withPermit(ref.set(None)) private def update: F[Unit] = sem.withPermit(fetchIfUnavailable) private def fetchIfUnavailable = ifUnavailable(fetch.fetch >>= (r => ref.set(Some(r)))) private def ifUnavailable[A](f: => F[A]): F[Unit] = expired >>= (Concurrent[F].whenA(_)(f)) private def expired = ref.get map (_.fold(true)(_.expires)) } final case object ResourceDidNotFetched extends RuntimeException with InvalidResourcePoolState object Fetch { def instance[F[_]](_fetch: => F[Resource]) = new Fetch[F] { override def fetch: F[Resource] = _fetch } } }
動作確認
これを以下のようなドライバーで動作確認してみます。
ここでは次の点を確認しています。
- 同じスケジューラー上で動作する別のキャッシュを使わないタスクの実行を妨げない
- キャッシュの更新を同時に行うのが1タスクだけ
package example.pool import cats.effect.{ContextShift, IO, Resource, Timer} import cats.implicits._ import java.util.concurrent.Executors import scala.concurrent.ExecutionContext import scala.concurrent.duration._ object UpdateCache extends App { val resourceTTL = 20.millis val taskDuration = 50.millis val numThreads = 10 val numTasks = 5000 val fetchDuration = 100.millis implicit def fetch(implicit timer: Timer[IO]): ResourcePool.Fetch[IO] = ResourcePool.Fetch.instance[IO]( IO(println(s"*** fetching on ${Thread.currentThread().getName}***")) *> timer.sleep(fetchDuration) *> IO.pure(ExpiringResource.create(resourceTTL)) <* IO(println("** fetch done **")) ) implicit def timer(implicit ec: ExecutionContext): Timer[IO] = IO.timer(ec) implicit def contextShift(implicit ec: ExecutionContext): ContextShift[IO] = IO.contextShift(ec) val threadPool = Resource .make(IO(sc))(es => IO(es.shutdown())) .map(es => ExecutionContext.fromExecutor(es)) val run = threadPool.use { implicit ec => for { pool <- ResourcePool.create[IO] _ <- otherTask.foreverM.start _ <- pool.use(task).foreverM.start _ <- timer.sleep(3.seconds) } yield () } def sc = Executors.newWorkStealingPool() def task(_r: ResourcePool.Resource)(implicit timer: Timer[IO]): IO[Unit] = IO(println("executing task")) *> timer.sleep(taskDuration) def otherTask(implicit timer: Timer[IO]) = IO(println("executing other task")) *> timer.sleep(taskDuration) run.unsafeRunSync() }
実行時の標準出力は下記の通りです。上記の2点が実現できていることがわかります。
executing other task executing other task *** fetching on ForkJoinPool-1-worker-1*** executing other task executing other task ** fetch done ** executing task executing other task *** fetching on ForkJoinPool-1-worker-1*** executing other task executing other task ** fetch done ** executing task executing other task (略)
まとめ
キャッシュスタンピード対策としてcats.effect.concurrent.Semaphoreを使った非同期ノンブロッキングロックによるキャッシュストレージを実装しました。